package com.bizmda.bizsip.app.client;

import cn.hutool.extra.spring.SpringUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.bizmda.bizsip.common.*;
import com.bizmda.bizsip.config.AbstractSinkConfig;
import com.bizmda.bizsip.config.RabbitmqSinkConfig;
import com.bizmda.bizsip.config.RestSinkConfig;
import com.bizmda.bizsip.config.SinkConfigMapping;
import com.bizmda.log.trace.MDCTraceUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.conn.ConnectTimeoutException;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.client.RestTemplate;

import java.lang.reflect.Method;
import java.net.SocketTimeoutException;

/**
 * @author shizhengye
 */
@Slf4j
public class SinkClientMethod {
    private final Method method;
    private final SinkClientProxy<?> sinkClientProxy;

    private final RestTemplate restTemplate;
    private final SinkConfigMapping sinkConfigMapping;
    private final RabbitTemplate rabbitTemplate;

    public SinkClientMethod(Method method, SinkClientProxy<?> sinkClientProxy) {
        this.method = method;
        this.sinkClientProxy = sinkClientProxy;
        this.sinkConfigMapping = SpringUtil.getBean(SinkConfigMapping.class);
        this.restTemplate = SpringUtil.getBean(RestTemplate.class);
        this.rabbitTemplate = SpringUtil.getBean(RabbitTemplate.class);
    }

    public Object execute(Object[] args) throws BizException {
        BizMessage<JSONObject> outMessage = this.callSink(this.sinkClientProxy.getSinkId(), args);
        if (outMessage.getCode() != 0) {
            throw new BizException(outMessage);
        }
        if (this.sinkClientProxy.getMapperInterface().equals(BizMessageInterface.class)) {
            return outMessage;
        }
        JSONObject jsonObject = new JSONObject(outMessage.getData());
        Object result = jsonObject.get("result");
        return BizTools.convertMethodReturnJson2Bean(method, result);
    }

    private BizMessage<JSONObject> callSink(String sinkId, Object[] args) throws BizException {
        JSONObject jsonObject = new JSONObject();
        if (this.sinkClientProxy.getMapperInterface().equals(BizMessageInterface.class)) {
            if (args[0] instanceof JSONObject) {
                jsonObject = (JSONObject) args[0];
            } else {
                jsonObject = JSONUtil.parseObj(args[0]);
            }
        } else {
            jsonObject.set("className", sinkClientProxy.getMapperInterface().getName());
            jsonObject.set("methodName", this.method.getName());
            jsonObject.set("params", JSONUtil.parseArray(args));
            JSONObject parametersTypes = BizTools.getParamtersTypesJsonObject(this.method, args);
            if (parametersTypes.size() > 0) {
                jsonObject.set("paramsTypes", parametersTypes);
            }
        }
        log.debug("调用Sink服务: {}",sinkId);
        log.trace("调用Sink服务请求报文:\n{}",BizUtils.buildJsonLog(jsonObject));
        BizMessage<JSONObject> inMessage = BizTools.bizMessageThreadLocal.get();
        inMessage.setData(jsonObject);

        AbstractSinkConfig sinkConfig = sinkConfigMapping.getSinkConfig(sinkId);
        BizMessage<JSONObject> outMessage;
        if (sinkConfig.getType() == AbstractSinkConfig.TYPE_REST) {
            RestSinkConfig restServerAdaptorConfig = (RestSinkConfig) sinkConfig;
            log.debug("调用同步Sink服务: {}", restServerAdaptorConfig.getUrl());
            try {
                outMessage = this.restTemplate.postForObject(restServerAdaptorConfig.getUrl(), inMessage, BizMessage.class);
            } catch (Exception e) {
                if ((e.getCause() instanceof SocketTimeoutException)
                        || (e.getCause() instanceof ConnectTimeoutException)) {
                    throw new BizException(BizResultEnum.OTHER_SERVICE_TIMEOUT,e);
                }
                throw e;
            }
            if (outMessage == null) {
                throw new BizException(BizResultEnum.OTHER_RETURN_NULL);
            }
            log.trace("调用Sink服务[{}]响应报文:\n{}",sinkId,BizUtils.buildBizMessageLog(outMessage));
            if (outMessage.getCode() != 0) {
                log.error("Sink服务返回错误:{}-{}\n{}", outMessage.getCode(),
                        outMessage.getMessage(),outMessage.getExtMessage());
                throw new BizException(outMessage);
            }
            if (!(outMessage.getData() instanceof JSONObject)) {
                outMessage.setData(JSONUtil.parseObj(outMessage.getData()));
            }
            return outMessage;
        } else if (sinkConfig.getType() == AbstractSinkConfig.TYPE_RABBITMQ) {
            RabbitmqSinkConfig rabbitmqSinkConfig = (RabbitmqSinkConfig) sinkConfig;
            log.debug("调用异步Sink服务: exchange[{}],route-key[{}]",
                    rabbitmqSinkConfig.getExchange(), rabbitmqSinkConfig.getRoutingKey());
            this.rabbitTemplate.convertAndSend(rabbitmqSinkConfig.getExchange(),
                    rabbitmqSinkConfig.getRoutingKey(), inMessage,
                    message -> {
                        message.getMessageProperties().setHeader(BizConstant.RABBITMQ_MESSAGE_HEADER_TRACE_ID, MDCTraceUtils.getTraceId());
                        return message;
                    });
            inMessage.setData(new JSONObject());
            return inMessage;
        } else {
            log.error("调用Sink服务[{}]出错:未知的Sink类型[{}]",sinkId,sinkConfig.getType());
            throw new BizException(BizResultEnum.OTHER_ERROR,
                    "调用Sink服务["+sinkId+"]出错：未知的Sink类型[" + sinkConfig.getType()+"]");
        }
    }

}
